log4j.logger.org.apache.spark.scheduler.TaskResultGetter=DEBUG
TaskResultGetter
TaskResultGetter is a helper class of TaskSchedulerImpl for asynchronous deserialization of task results of tasks that have finished successfully (possibly fetching remote blocks) or the failures for failed tasks.
|
Caution
|
FIXME Image with the dependencies |
|
Tip
|
Consult Task States in Tasks to learn about the different task states. |
|
Note
|
The only instance of TaskResultGetter is created while TaskSchedulerImpl is created.
|
TaskResultGetter requires a SparkEnv and TaskSchedulerImpl to be created and is stopped when TaskSchedulerImpl stops.
TaskResultGetter uses task-result-getter asynchronous task executor for operation.
|
Tip
|
Enable Add the following line to Refer to Logging. |
task-result-getter Asynchronous Task Executor
getTaskResultExecutor: ExecutorService
getTaskResultExecutor creates a daemon thread pool with spark.resultGetter.threads threads and task-result-getter prefix.
|
Tip
|
Read up on java.util.concurrent.ThreadPoolExecutor that getTaskResultExecutor uses under the covers.
|
serializer Attribute
serializer: ThreadLocal[SerializerInstance]
serializer is a thread-local SerializerInstance that TaskResultGetter uses to deserialize byte buffers (with TaskResults or a TaskEndReason).
When created for a new thread, serializer is initialized with a new instance of Serializer (using SparkEnv.closureSerializer).
|
Note
|
TaskResultGetter uses java.lang.ThreadLocal for the thread-local SerializerInstance variable.
|
taskResultSerializer Attribute
taskResultSerializer: ThreadLocal[SerializerInstance]
taskResultSerializer is a thread-local SerializerInstance that TaskResultGetter uses to…
When created for a new thread, taskResultSerializer is initialized with a new instance of Serializer (using SparkEnv.serializer).
|
Note
|
TaskResultGetter uses java.lang.ThreadLocal for the thread-local SerializerInstance variable.
|
Deserializing Task Result and Notifying TaskSchedulerImpl — enqueueSuccessfulTask Method
enqueueSuccessfulTask(
taskSetManager: TaskSetManager,
tid: Long,
serializedData: ByteBuffer): Unit
enqueueSuccessfulTask submits an asynchronous task (to task-result-getter asynchronous task executor) that first deserializes serializedData to a DirectTaskResult, then updates the internal accumulator (with the size of the DirectTaskResult) and ultimately notifies the TaskSchedulerImpl that the tid task was completed and the task result was received successfully or not.
|
Note
|
enqueueSuccessfulTask is just the asynchronous task enqueued for execution by task-result-getter asynchronous task executor at some point in the future.
|
Internally, the enqueued task first deserializes serializedData to a TaskResult (using the internal thread-local serializer).
The TaskResult could be a DirectTaskResult or a IndirectTaskResult.
For a DirectTaskResult, the task checks the available memory for the task result and, when the size overflows spark.driver.maxResultSize, it simply returns.
|
Note
|
enqueueSuccessfulTask is a mere thread so returning from a thread is to do nothing else. That is why the check for quota does abort when there is not enough memory.
|
Otherwise, when there is enough memory to hold the task result, it deserializes the DirectTaskResult (using the internal thread-local taskResultSerializer).
For a IndirectTaskResult, the task checks the available memory for the task result and, when the size could overflow the maximum result size, it removes the block and simply returns.
Otherwise, when there is enough memory to hold the task result, you should see the following DEBUG message in the logs:
DEBUG Fetching indirect task result for TID [tid]
The task notifies TaskSchedulerImpl that it is about to fetch a remote block for a task result. It then gets the block from remote block managers (as serialized bytes).
When the block could not be fetched, TaskSchedulerImpl is informed (with TaskResultLost task failure reason) and the task simply returns.
|
Note
|
enqueueSuccessfulTask is a mere thread so returning from a thread is to do nothing else and so the real handling is when TaskSchedulerImpl is informed.
|
The task result (as a serialized byte buffer) is then deserialized to a DirectTaskResult (using the internal thread-local serializer) and deserialized again using the internal thread-local taskResultSerializer (just like for the DirectTaskResult case). The block is removed from BlockManagerMaster and simply returns.
|
Note
|
A IndirectTaskResult is deserialized twice to become the final deserialized task result (using serializer for a DirectTaskResult). Compare it to a DirectTaskResult task result that is deserialized once only.
|
With no exceptions thrown, enqueueSuccessfulTask informs the TaskSchedulerImpl that the tid task was completed and the task result was received.
A ClassNotFoundException leads to aborting the TaskSet (with ClassNotFound with classloader: [loader] error message) while any non-fatal exception shows the following ERROR message in the logs followed by aborting the TaskSet.
ERROR Exception while getting task result
|
Note
|
enqueueSuccessfulTask is called when TaskSchedulerImpl is notified about a task that has finished successfully (i.e. in FINISHED state).
|
Deserializing TaskFailedReason and Notifying TaskSchedulerImpl — enqueueFailedTask Method
enqueueFailedTask(
taskSetManager: TaskSetManager,
tid: Long,
taskState: TaskState.TaskState,
serializedData: ByteBuffer): Unit
enqueueFailedTask submits an asynchronous task (to task-result-getter asynchronous task executor) that first attempts to deserialize a TaskFailedReason from serializedData (using the internal thread-local serializer) and then notifies TaskSchedulerImpl that the task has failed.
Any ClassNotFoundException leads to the following ERROR message in the logs (without breaking the flow of enqueueFailedTask):
ERROR Could not deserialize TaskEndReason: ClassNotFound with classloader [loader]
|
Note
|
enqueueFailedTask is called when TaskSchedulerImpl is notified about a task that has failed (and is in FAILED, KILLED or LOST state).
|